import functools
import pytest

from devtools_testutils import (
    AzureRecordedTestCase,
    EnvironmentVariableLoader,
)
from devtools_testutils.aio import recorded_by_proxy_async
from azure.core.credentials import AzureKeyCredential
from azure.ai.textanalytics.aio import TextAnalysisClient
from azure.ai.textanalytics.models import (
    MultiLanguageTextInput,
    MultiLanguageInput,
    TextPiiEntitiesRecognitionInput,
    AnalyzeTextPiiResult,
    PiiResultWithDetectedLanguage,
    PiiEntity,
    PiiActionContent,
    EntityMaskPolicyType,
    CharacterMaskPolicyType,
    SyntheticReplacementPolicyType,
)

TextAnalysisPreparer = functools.partial(
    EnvironmentVariableLoader,
    "text_analysis",
    text_analysis_endpoint="https://Sanitized.azure-api.net/",
    text_analysis_key="fake_key",
)


class TestTextAnalysis(AzureRecordedTestCase):
    def create_client(self, endpoint: str, key: str) -> TextAnalysisClient:
        return TextAnalysisClient(endpoint, AzureKeyCredential(key))


class TestTextAnalysisCase(TestTextAnalysis):
    @TextAnalysisPreparer()
    @recorded_by_proxy_async
    @pytest.mark.asyncio
    async def test_analyze_text_recognize_pii_redaction_policies_async(self, text_analysis_endpoint, text_analysis_key):
        async with self.create_client(text_analysis_endpoint, text_analysis_key) as client:

            # Documents
            documents = [
                MultiLanguageInput(
                    id="1",
                    text="My name is John Doe. My ssn is 123-45-6789. My email is john@example.com..",
                    language="en",
                ),
                MultiLanguageInput(
                    id="2",
                    text="My name is John Doe. My ssn is 123-45-6789. My email is john@example.com..",
                    language="en",
                ),
            ]

            text_input = MultiLanguageTextInput(multi_language_inputs=documents)

            # Redaction Policies
            default_policy = EntityMaskPolicyType(policy_name="defaultPolicy", is_default=True)

            ssn_policy = CharacterMaskPolicyType(
                policy_name="customMaskForSSN",
                unmask_length=4,
                unmask_from_end=False,
                entity_types=["USSocialSecurityNumber"],
            )

            synthetic_policy = SyntheticReplacementPolicyType(
                policy_name="syntheticMaskForPerson", entity_types=["Person", "Email"]
            )

            parameters = PiiActionContent(
                pii_categories=["All"], redaction_policies=[default_policy, ssn_policy, synthetic_policy]
            )

            body = TextPiiEntitiesRecognitionInput(text_input=text_input, action_content=parameters)

            # Async (non-LRO) call
            result = await client.analyze_text(body=body)

            # Basic validation
            assert result is not None
            assert isinstance(result, AnalyzeTextPiiResult)
            assert result.results is not None
            assert result.results.documents is not None
            assert len(result.results.documents) == 2

            for doc in result.results.documents:
                # Redacted text must exist and original PII must not appear
                redacted = doc.redacted_text
                assert redacted is not None
                assert "John Doe" not in redacted
                assert "123-45-6789" not in redacted
                assert "john@example.com" not in redacted

                # Must detect 3 PII entities
                assert len(doc.entities) == 3
                categories = {e.category for e in doc.entities}
                assert categories == {"Person", "USSocialSecurityNumber", "Email"}

                # Validate Person entity was replaced (synthetic replacement)
                person = next(e for e in doc.entities if e.category == "Person")
                assert person.mask is not None
                assert person.mask != person.text  # replaced with a different name

                # Validate SSN is masked with asterisks
                ssn = next(e for e in doc.entities if e.category == "USSocialSecurityNumber")
                assert ssn.mask is not None
                assert "*" in ssn.mask
                assert "123-45-6789" not in redacted

                # Validate Email is replaced (synthetic replacement)
                email = next(e for e in doc.entities if e.category == "Email")
                assert email.mask is not None
                assert email.mask != email.text
                assert "john@example.com" not in redacted
